Glue for RayでRedshiftにData APIで接続してみた
データアナリティクス事業本部の笠原です。
Glue for RayはGA時点でもVPC内への接続はできません。 そのため、VPC内のリソースである、RDSやRedshiftへはパブリック経路で接続する必要があります。
ただ、RDSやRedshiftへの接続には、Data APIという別の方法があります。 これが使えれば、Glue for RayでもVPC内のプライベートなRDSやRedshiftにアクセスできるはずです。
今回は、Glue for RayでRedshift Data APIを使って、パブリックに公開されていないプライベートなRedshiftクラスタに接続してみます。
概要図
RedshiftはVPC内で起動します。 VPCにはインターネットゲートウェイはアタッチしませんし、Redshiftはパブリックアクセスをしません。 また、GlueジョブにはGlue接続はアタッチしません。
Data APIを利用する際、今回はSecrets Managerを利用します。
準備
最初に、Cloudformationテンプレートで一通り作成します。 まずは、Redshiftと、Redshiftを配置するためのVPCを作成します。
Redshiftは、シングルノードの dc2.large
を指定しています。
ノードタイプは、Redshift Data APIに対応しているノードタイプを選びましょう。
AWSTemplateFormatVersion: "2010-09-09" Parameters: MasterUserName: Type: String Default: defaultuser AllowedPattern: "([a-z])([a-z]|[0-9])*" MasterUserPassword: Type: String NoEcho: true Resources: ## Redshift RedshiftCluster: Type: AWS::Redshift::Cluster Properties: ClusterType: single-node NodeType: dc2.large DBName: glueraysampledb MasterUsername: !Ref MasterUserName MasterUserPassword: !Ref MasterUserPassword ClusterParameterGroupName: !Ref RedshiftClusterParameterGroup VpcSecurityGroupIds: - !Ref RedshiftClusterSecurityGroup ClusterSubnetGroupName: !Ref RedshiftClusterSubnetGroup PubliclyAccessible: false Port: 5439 RedshiftClusterParameterGroup: Type: AWS::Redshift::ClusterParameterGroup Properties: Description: Cluster Parameter Group ParameterGroupFamily: redshift-1.0 Parameters: - ParameterName: enable_user_activity_logging ParameterValue: true RedshiftClusterSubnetGroup: Type: AWS::Redshift::ClusterSubnetGroup Properties: Description: Cluster Subnet Group SubnetIds: - !Ref ClusterSubnet ## VPC VPC: Type: AWS::EC2::VPC Properties: CidrBlock: 10.0.0.0/16 ClusterSubnet: Type: AWS::EC2::Subnet Properties: CidrBlock: 10.0.1.0/24 VpcId: !Ref VPC AvailabilityZone: ap-northeast-1a RedshiftClusterSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupName: RedshiftClusterSecurityGroup GroupDescription: Security Group VpcId: !Ref VPC SecurityGroupIngress: - CidrIp: 10.0.0.0/16 FromPort: 5439 ToPort: 5439 IpProtocol: tcp
次に、Glueジョブで必要なIAMロールも作成します。
AWSTemplateFormatVersion: "2010-09-09" Resources: ## IAM GlueJobRoleWithRedshiftDataAPI: Type: AWS::IAM::Role Properties: RoleName: glue-ray-redshift-job-role AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - glue.amazonaws.com Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonS3FullAccess - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole - arn:aws:iam::aws:policy/AmazonRedshiftDataFullAccess - arn:aws:iam::aws:policy/AmazonRedshiftFullAccess
データ読み込み時に出力先となるS3バケットも用意しておきましょう。
AWSTemplateFormatVersion: "2010-09-09" Resources: ## S3 GlueRaySampleBucket: Type: AWS::S3::Bucket Properties: BucketName: glue-ray-sample-kasahara PublicAccessBlockConfiguration: BlockPublicAcls: True BlockPublicPolicy: True IgnorePublicAcls: True RestrictPublicBuckets: True
続いて、Redshiftに今回のテスト用のデータを作成します。 クエリエディタv2に入って、以下のSQLを実行しました。
-- スキーマ & テーブル作成 CREATE SCHEMA rayschema; CREATE TABLE rayschema.posts ( id INTEGER NOT NULL, title VARCHAR(255) NOT NULL, description VARCHAR(255) );
-- データを新規作成 INSERT INTO rayschema.posts (id, title, description) VALUES (1, 'タイトル1', '本文です。'); INSERT INTO rayschema.posts (id, title, description) VALUES (2, 'タイトル2', '本文です。'); INSERT INTO rayschema.posts (id, title, description) VALUES (3, 'タイトル3', '本文です。'); INSERT INTO rayschema.posts (id, title, description) VALUES (4, 'タイトル4', '本文です。'); INSERT INTO rayschema.posts (id, title, description) VALUES (5, 'タイトル5', '本文です。'); INSERT INTO rayschema.posts (id, title, description) VALUES (6, 'タイトル6', '本文です。'); INSERT INTO rayschema.posts (id, title, description) VALUES (7, 'タイトル7', '本文です。'); INSERT INTO rayschema.posts (id, title, description) VALUES (8, 'タイトル8', '本文です。'); INSERT INTO rayschema.posts (id, title, description) VALUES (9, 'タイトル9', '本文です。'); INSERT INTO rayschema.posts (id, title, description) VALUES (10, 'タイトル10', '本文です。'); INSERT INTO rayschema.posts (id, title, description) VALUES (11, 'タイトル11', '本文です。'); INSERT INTO rayschema.posts (id, title, description) VALUES (12, 'タイトル12', '本文です。'); INSERT INTO rayschema.posts (id, title, description) VALUES (13, 'タイトル13', '本文です。'); INSERT INTO rayschema.posts (id, title, description) VALUES (14, 'タイトル14', '本文です。'); INSERT INTO rayschema.posts (id, title, description) VALUES (15, 'タイトル15', '本文です。'); INSERT INTO rayschema.posts (id, title, description) VALUES (16, 'タイトル16', '本文です。'); INSERT INTO rayschema.posts (id, title, description) VALUES (17, 'タイトル17', '本文です。'); INSERT INTO rayschema.posts (id, title, description) VALUES (18, 'タイトル18', '本文です。'); INSERT INTO rayschema.posts (id, title, description) VALUES (19, 'タイトル19', '本文です。'); INSERT INTO rayschema.posts (id, title, description) VALUES (20, 'タイトル20', '本文です。');
最後に、Secrets Managerでシークレット情報を作成します。 今回はテストなので、Redshift作成時に生成したスーパーユーザのユーザ名とパスワードのシークレット情報を作成します。
また、タグに RedshiftDataFullAccess
を設定します。値は空でOKです。
これは、 マネージドポリシーである AmazonRedshiftDataFullAccess
が Secrets Managerからシークレット情報を取得する際の条件として、リソースタグ名が RedshiftDataFullAccess
のシークレットのみ取得可能と指定されているためです。
Glue for Rayジョブ作成
マネジメントコンソールから、Glue Studio上でRayジョブを作成しましょう。 GlueジョブのIAMロールは、先ほど作成したロールを設定してください。
データの読み込み
今回は簡単のため、AWS SDK for pandas (AWS Wrangler)を使います。 AWS SDK for pandasには、Redshift Data APIを使ってデータを読み込むことが簡単にできる機能が用意されています。
引数にて --pip-install
でインストールするパッケージを指定します。
今回は、以下のパッケージをインストールします。
awswrangler
のバージョンは、Rayがプレビューだった時に利用できたバージョンにしてます。
modin[ray], tqdm, awswrangler==3.1.1
コードは以下の通りにしました。
import ray import awswrangler as wr ## AWS SDK for Pandasの実行エンジン(Ray)とメモリフォーマット(Modin)の確認 print(f"Execution Engine: {wr.engine.get()}") print(f"Memory Format: {wr.memory_format.get()}") ## Redshift Data API 接続情報定義 con_redshift = wr.data_api.redshift.connect( cluster_id="<RedshiftクラスタID>", database="glueraysampledb", ## Redshiftデータベース名 secret_arn="<事前に作成したシークレットのARN>" ) ## SQLでデータ読み込み df = wr.data_api.redshift.read_sql_query( sql="SELECT * FROM rayschema.posts", con=con_redshift, ) ## 内容確認 print(df.head(5)) print(df.info()) ## S3へ書き込み wr.s3.to_parquet( df=df, path="s3://glue-ray-sample-kasahara/ray/redshift-data-api/outputs/", index=False )
print文の出力内容は Cloudwatch Logsのロググループ /aws-glue/ray/jobs/script-log
に出力されています。
また、S3バケットにもParquetファイルが出力されています。
データの書き込み
AWS SDK for pandasで用意されているメソッドには、データ書き込みに関するものがありません。 そこで、データ書き込みについては、boto3を用いてInsertしていきたいと思います。
以下の記事を参考にしました。
以下のようなコードで500件追加してみました。
import ray import boto3 import time import json ray.init() ## Redshift接続情報定義 CLUSTER_ID='<RedshiftクラスタID>' DATABASE_NAME='glueraysampledb' ## Redshiftデータベース名 SECRET_ARN='<事前に作成したシークレットのARN>' ## 実行するSQL SQL = ''' INSERT INTO rayschema.posts (id, title, description) VALUES (:id, :title, :description); ''' ## データ書き込み件数 max_count=500 offset=20 ## 書き込みタスクの定義 @ray.remote def insert_data(id): data_client = boto3.client('redshift-data') ## SQLクエリの実行 result = data_client.execute_statement( ClusterIdentifier=CLUSTER_ID, Database=DATABASE_NAME, SecretArn=SECRET_ARN, Sql=SQL, Parameters=[ {'name': 'id', 'value': f'{id}'}, {'name': 'title', 'value': f'タイトル{id}'}, {'name': 'description', 'value': '本文です。'}, ] ) ## SQL実行IDを確認 statement_id = result['Id'] ## SQLクエリ実行が終わるまで待つ statement=None status='' while status != 'FINISHED' and status != 'FAILED' and status != 'ABORTED': statement = data_client.describe_statement(Id=statement_id) status = statement['Status'] time.sleep(1) ## 結果の応答 if status == 'FINISHED': if int(statement['ResultSize']) > 0: statement = data_client.get_statement_result(Id=statement_id) print(json.dumps(statement['Records'])) return 0 else: print('QUERY FINISHED.') return 0 elif status == 'FAILED': print(f'QUERY FAILED.\n{statement}') return 1 elif status == 'ABORTED': print('QUERY ABORTED: The query run was stopped by the user.') return 1 ## タスクの実行 futures = [insert_data.remote(id) for id in range(offset+1, max_count+offset+1)] results = ray.get(futures) if 1 in results: print('QUERY FAILED.') else: print('QUERY COMPLETED.')
タスクの途中で、クエリ結果取得待ちで time.sleep(1)
を入れているので、実行結果はそこまで早くなりません。
また、 boto3.client('redshift-data')
の定義はRayタスクの中で行なっていますが、これは元々Rayタスクの外で定義してました。
## 書き込みタスクの定義 @ray.remote def insert_data(id, data_client): ## SQLクエリの実行 ## <省略> ## タスクの実行 client = ray.put(boto3.client('redshift-data')) futures = [insert_data.remote(id, client) for id in range(offset+1, max_count+offset+1)]
ただし、実行時に以下のエラーが出たため、やむなくRayタスクの中で定義しています。
TypeError: Could not serialize the put value <botocore.client.RedshiftDataAPIService object at 0x7f6cc96ac0>
Redshiftクエリエディタv2にて、件数が増えていることが確認できます。 元々20件あって、追加で500件増えたため、トータル520件になってます。
まとめ
いかがでしたでしょうか。 Glue for RayはまだVPCへプライベート接続できないですが、Data APIを使うことでプライベートなRedshiftクラスタに接続できることが確認できました。 VPCへのプライベート接続ができるまでは、Data APIを使っていきましょう。